package com.carol.bigdata.task.feature.cag

import com.carol.bigdata.Config
import com.carol.bigdata.Config.hbaseParams
import com.carol.bigdata.constant.KVConstant
import com.carol.bigdata.utils.FeatureUtil.{calFeat, calTotal, changeProfileRDD, getDataRowKey}
import com.carol.bigdata.utils.RddReader.readHiveRDD
import com.carol.bigdata.utils.{Flag, HBaseUtil, TimeUtil}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession


object calStrategyTag {

    // 基本字段
    val keyColumns: List[String] = KVConstant.keyColumns
    val keyLen: Int = keyColumns.length
    val cf: String = KVConstant.cf
    val propertiesField: String = KVConstant.propertiesField

    // 数据源
    // 子游戏Play表
    val subPlayTable: String = KVConstant.actionTable
    val subPlayColumns: List[String] = keyColumns :+ propertiesField
    val subPlayPattern: String = "cag_subgame_play"
    val subPlayPropIdx: Int = subPlayColumns.indexOf(propertiesField)
    val subPlayPropColumns: List[String] = List("subgame", "level", "duration", "win_count", "lose_count", "win_token")
    val subGameIdx: Int = subPlayPropColumns.indexOf("subgame") + keyLen
    val durationIdx: Int = subPlayPropColumns.indexOf("duration") + keyLen

    // 子游戏比赛表
    val competitionTable: String = KVConstant.actionTable
    val competitionColumns: List[String] = keyColumns :+ propertiesField
    val competitionPattern: String = "cag_subgame_competition"
    val competitionPropIdx: Int = competitionColumns.indexOf(propertiesField)
    val competitionPropColumns: List[String] = List("subgame", "competition_duration", "competition_reward_grade",
        "rank", "win_count", "lose_count", "win_token")

    // 子游戏任务奖励表
    val subTaskTable: String = KVConstant.actionTable
    val subTaskColumns: List[String] = keyColumns :+ propertiesField
    val subTaskPattern: String = "cag_subgame_task_reward"
    val subTaskPropIdx: Int = subTaskColumns.indexOf(propertiesField)
    val subTaskPropColumns: List[String] = List("subgame", "task_type")

    // 大厅任务奖励表
    val gameTaskTable: String = KVConstant.actionTable
    val gameTaskColumns: List[String] = keyColumns :+ propertiesField
    val gameTaskPattern: String = "task_reward"
    val gameTaskPropIdx: Int = gameTaskColumns.indexOf(propertiesField)
    val gameTaskPropColumns: List[String] = List("task_type")

    // 大厅夺宝表
    val treasureTable: String = KVConstant.actionTable
    val treasureColumns: List[String] = keyColumns
    val treasurePattern: String = "treasure_rob"

    // 大厅礼票兑换表
    val exchangeTable: String = KVConstant.actionTable
    val exchangeColumns: List[String] = keyColumns :+ propertiesField
    val exchangePattern: String = "ticket_exchange"
    val exchangePropIdx: Int = exchangeColumns.indexOf(propertiesField)
    val exchangePropColumns: List[String] = List("is_virtual")

    // 大厅彩票购买表
    val lotteryTable: String = KVConstant.actionTable
    val lotteryColumns: List[String] = keyColumns :+ propertiesField
    val lotteryPattern: String = "cag_lottery_buy"
    val lotteryPropIdx: Int = lotteryColumns.indexOf(propertiesField)
    val lotteryPropColumns: List[String] = List("ticket_count")

    // 游戏日更中间表
    // 用户数据统计结果表
    val userProfileTable: String = KVConstant.userProfileTable
    val keyCF: String = KVConstant.keyCF
    val keyCols: List[String] = KVConstant.dateKeyColumns
    val writeKeyCols: List[String] = KVConstant.writeKeyCols
    val event: String = "profile"

    // 用户玩法特征字段 strategy_tag
    val strategyMiddleCF: String = "strategy_tag"
    val subPlayFeat: List[(String, String)] = List(
        ("$subgame_play", "map"),
        ("$subgame_play_level", "map"),
        ("$subgame_play_duration", "split_map"),
        ("$subgame_win_count", "int"),
        ("$subgame_lose_count", "int"),
        ("$subgame_win_token", "int"),
        ("$subgame_play_sum_duration", "int")
    )
    val competitionFeat: List[(String, String)] = List(
        ("$competition_subgame", "map"),
        ("$competition_duration", "int"),
        ("$competition_reward_grade", "map"),
        ("$competition_rank", "map"),
        ("$competition_win_count", "int"),
        ("$competition_lose_count", "int"),
        ("$competition_win_token", "int")
    )
    val subTaskFeat: List[(String, String)] = List(
        ("$subgame_task", "map"),
        ("$subgame_task_type", "map")
    )
    val gameTaskFeat: List[(String, String)] = List(
        ("$game_task_type", "map")
    )
    val treasureFeat: List[(String, String)] = List(
        ("$game_treasure_rob_count", "int")
    )
    val exchangeFeat: List[(String, String)] = List(
        ("$game_exchange_type", "map"),
        ("$game_exchange_count", "int")
    )
    val lotteryFeat: List[(String, String)] = List(
        ("$game_lottery_ticket_count", "int"),
        ("$game_lottery_count", "int")
    )


    def calPlayInfo(statDay: String,
                    playUsers: RDD[List[String]],
                    featList: List[(String, String)]): RDD[(List[String], List[String])] = {

        //println(s"playUsers: ${playUsers.count()}")
        //playUsers.take(5).foreach(println)
        val playRDD = playUsers
            .map(x => {
                val (key, info) = x.splitAt(keyLen)
                (key, info.map(List(_)))
            })
            .reduceByKey((a, b) => a.zip(b).map(i => i._1 ++ i._2))
        val playInfoRDD = playRDD.map(x => {
            val res = calFeat(x._2, featList)
            (statDay +: x._1, res)
        })
        println(s"playInfoRDD: ${playInfoRDD.count()}")
        println(s"feat: ${featList}")
        playInfoRDD.take(5).foreach(println)
        playInfoRDD
    }


    def run(hbaseParams: Map[String, String],
            spark: SparkSession,
            statDay: String,
            game: String): Unit = {
        println("计算策略特征")
        val yesterday: String = TimeUtil.getTimeDeltaDay(statDay, -1)
        val yesterdayRowKey = for (date <- List(yesterday)) yield getDataRowKey(game, event,  date)
        // 1、读取hbase数据
        // List(gameId,accountId,properties) => List(gameId,accountId,subGame,level,subGame:duration,win_count,lose_count,win_token,duration)
        //val subGameChooseRowKey = for (date <- List(statDay)) yield getDataRowKey(game, subPlayPattern,  date)
        //val subGameUsers = HBaseUtil.readAsList(hbaseParams, spark, subPlayTable, cf, subPlayColumns,
        //    subGameChooseRowKey, timeColumn = "time", timeMode = "day", filterMode = "PATTERN AND TIME")
        //    .map(parseProperties(_, subPlayPropIdx, subPlayPropColumns))
        //    .map(x => (for (i <- x.indices) yield {if (i != durationIdx) x(i) else List(x(subGameIdx), x(i)).mkString(":")}).toList :+ x(durationIdx))
        val subGameUsers = readHiveRDD(spark, subPlayTable, statDay, subPlayPattern,  game, keyColumns, subPlayPropColumns)
            .map(x => (for (i <- x.indices) yield {
                if (i != durationIdx) x(i) else List(x(subGameIdx), x(i)).mkString(":")
            }).toList :+ x(durationIdx))
        println("subGameUsers: ", subGameUsers.count())
        subGameUsers.take(5).foreach(println)

        // List(gameId,accountId,properties) => List(gameId,accountId,subGame,competitionDuration,competitionRewardGrade,rank,winCount,loseCount,winToken)
        //val competitionChooseRowKey = for (date <- List(statDay)) yield getDataRowKey(game, competitionPattern,  date)
        //val competitionUsers = HBaseUtil.readAsList(hbaseParams, spark, competitionTable, cf, competitionColumns,
        //    competitionChooseRowKey, timeColumn = "time", timeMode = "day", filterMode = "PATTERN AND TIME")
        //    .map(parseProperties(_, competitionPropIdx, competitionPropColumns))
        val competitionUsers = readHiveRDD(spark, competitionTable, statDay, competitionPattern,  game, keyColumns, competitionPropColumns)
        println("competitionUsers: ", competitionUsers.count())
        competitionUsers.take(5).foreach(println)

        // List(gameId,accountId,properties) => List(gameId,accountId,subGame,taskType)
        //val subTaskChooseRowKey = for (date <- List(statDay)) yield getDataRowKey(game, subTaskPattern,  date)
        //val subTaskUsers = HBaseUtil.readAsList(hbaseParams, spark, subTaskTable, cf, subTaskColumns,
        //    subTaskChooseRowKey, timeColumn = "time", timeMode = "day", filterMode = "PATTERN AND TIME")
        //    .map(parseProperties(_, subTaskPropIdx, subTaskPropColumns))
        val subTaskUsers = readHiveRDD(spark, subTaskTable, statDay, subTaskPattern,  game, keyColumns, subTaskPropColumns)
        println("subTaskUsers: ", subTaskUsers.count())
        subTaskUsers.take(5).foreach(println)

        // List(gameId,accountId,properties) => List(gameId,accountId,taskType)
        //val gameTaskChooseRowKey = for (date <- List(statDay)) yield getDataRowKey(game, gameTaskPattern,  date)
        //val gameTaskUsers = HBaseUtil.readAsList(hbaseParams, spark, gameTaskTable, cf, gameTaskColumns,
        //    gameTaskChooseRowKey, timeColumn = "time", timeMode = "day", filterMode = "PATTERN AND TIME")
        //    .map(parseProperties(_, gameTaskPropIdx, gameTaskPropColumns))
        val gameTaskUsers = readHiveRDD(spark, gameTaskTable, statDay, gameTaskPattern,  game, keyColumns, gameTaskPropColumns)
        println("gameTaskUsers: ", gameTaskUsers.count())
        gameTaskUsers.take(5).foreach(println)

        // List(gameId,accountId) => List(gameId,accountId,1)
        //val treasureRobChooseRowKey = for (date <- List(statDay)) yield getDataRowKey(game, treasurePattern,  date)
        //val treasureRobUsers = HBaseUtil.readAsList(hbaseParams, spark, treasureTable, cf, treasureColumns,
        //    treasureRobChooseRowKey, timeColumn = "time", timeMode = "day", filterMode = "PATTERN AND TIME")
        //    .map(_ :+ "1")
        val treasureRobUsers = readHiveRDD(spark, treasureTable, statDay, treasurePattern,  game, keyColumns, List())
            .map(_ :+ "1")
        println("treasureRobUsers: ", treasureRobUsers.count())
        treasureRobUsers.take(5).foreach(println)


        // List(gameId,accountId,properties) => List(gameId,accountId,isVirtual,1)
        //val exchangeChooseRowKey = for (date <- List(statDay)) yield getDataRowKey(game, exchangePattern,  date)
        //val exchangeUsers = HBaseUtil.readAsList(hbaseParams, spark, exchangeTable, cf, exchangeColumns,
        //    exchangeChooseRowKey, timeColumn = "time", timeMode = "day", filterMode = "PATTERN AND TIME")
        //    .map(parseProperties(_, exchangePropIdx, exchangePropColumns) :+ "1")
        val exchangeUsers = readHiveRDD(spark, exchangeTable, statDay, exchangePattern,  game, keyColumns, exchangePropColumns)
            .map(_ :+ "1")
        println("exchangeUsers: ", exchangeUsers.count())
        exchangeUsers.take(5).foreach(println)


        // List(gameId,accountId,properties) => List(gameId,accountId,ticketCount,1)
        //val lotteryChooseRowKey = for (date <- List(statDay)) yield getDataRowKey(game, lotteryPattern,  date)
        //val lotteryUsers = HBaseUtil.readAsList(hbaseParams, spark, lotteryTable, cf, lotteryColumns,
        //    lotteryChooseRowKey, timeColumn = "time", timeMode = "day", filterMode = "PATTERN AND TIME")
        //    .map(parseProperties(_, lotteryPropIdx, lotteryPropColumns) :+ "1")
        val lotteryUsers = readHiveRDD(spark, lotteryTable, statDay, lotteryPattern,  game, keyColumns, lotteryPropColumns)
            .map(_ :+ "1")
        println("lotteryUsers: ", lotteryUsers.count())
        lotteryUsers.take(5).foreach(println)

        // 2、计算
        val dataList = List(subGameUsers, competitionUsers, subTaskUsers, gameTaskUsers, treasureRobUsers, exchangeUsers, lotteryUsers)
        val featList = List(subPlayFeat, competitionFeat, subTaskFeat, gameTaskFeat, treasureFeat, exchangeFeat, lotteryFeat)
        val rddLists = for ((rdd, feats) <- dataList.zip(featList)) yield calPlayInfo(statDay, rdd, feats)

        // 加上事件和时区字段
        val rddList = for (rdd <- rddLists) yield changeProfileRDD(rdd)

        // 3、写入hbase中间表
        for ((rdd, feat) <- rddList.zip(featList)) {
            val valueCols = feat.map(_._1)
            HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, rdd, keyCF, writeKeyCols, valueCols,
                List.fill(valueCols.length)(strategyMiddleCF))
        }

        // 4、计算total并写入
        for ((rdd, feats) <- rddLists.zip(featList)) {
            val totalFeats = feats.map(x => (x._1 + "_total", x._2))
            val totalIntColumns = totalFeats.filter(_._2.toLowerCase == "int").map(_._1)
            val totalMapColumns = totalFeats.filter(_._2.toLowerCase.contains("map")).map(_._1)
            val List(totalIntRDD, totalMapRDD) = calTotal(hbaseParams, spark, statDay, rdd,
                userProfileTable, List(keyCF, strategyMiddleCF), keyCols, totalFeats, yesterdayRowKey)
            val List(writeTotalIntRDD, writeTotalMapRDD) =
                for (rdd <- List(totalIntRDD, totalMapRDD)) yield changeProfileRDD(rdd)
            HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeTotalIntRDD, keyCF, writeKeyCols, totalIntColumns,
                List.fill(totalIntColumns.length)(strategyMiddleCF))
            HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeTotalMapRDD, keyCF, writeKeyCols, totalMapColumns,
                List.fill(totalMapColumns.length)(strategyMiddleCF))
        }
    }


    def main(args: Array[String]): Unit = {
        Flag.Parse(args)
        val spark = SparkSession.builder()
            .config(conf = Config.sparkConf)
            .enableHiveSupport()
            .getOrCreate()
        val sc = spark.sparkContext
        sc.setLogLevel("ERROR")
        //val ipPath: String = sc.broadcast(Config.ipPath).value

        val gameList = List("200")
        val timezoneList = List("Asia/Bangkok")
        for (game <- gameList) {
            for (timezone <- timezoneList) {
                for (day <- 1 to 1) {
                    val statday = "2021-08-%02d".format(day)
                    TimeUtil.timer(run(hbaseParams, spark, statday, game))
                }
            }
        }
        spark.stop()
    }

}
